package com.uxsino.commons.db.queue;

import com.uxsino.commons.db.queue.dao.DBQDao;
import com.uxsino.commons.db.queue.entity.DBQ;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service;

import javax.transaction.Transactional;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;

/**
 * @author Wang Sihao
 * @create 2020/3/30
 */
@Service
class DBQueueService {
    private static final Logger LOG = LoggerFactory.getLogger(DBQueueService.class);
    private static final ReentrantLock writer = new ReentrantLock();
    @Autowired
    DBQDao dbqDao;

    @Autowired
    JdbcTemplate jdbcTemplate;

    @Value("simo_queue_${spring.application.name:core}")
    private String tableName;

    private static AtomicLong COUNTER_SIZE = new AtomicLong(0);
    private static AtomicLong COUNTER_EMPTY = new AtomicLong(0);


    @Transactional
    public void clean(String name){
        String sql = "DELETE FROM "+ tableName +" WHERE queue_name = '"+name+"';";
        jdbcTemplate.execute(sql);
    }

    @Transactional
    public void clean(){
        String sql = "DELETE FROM "+ tableName + ";";
        jdbcTemplate.execute(sql);
    }

    @Transactional
    public void push(String name, Collection<String> data){
        if(data == null || data.isEmpty()){
            return;
        }
        writer.lock();
        try {
            dbqDao.push(data.stream().map(itm->new DBQ(name, itm)).collect(Collectors.toList()));
        }finally {
            writer.unlock();
        }
    }

    @Transactional
    public List<String> poll(String name, int size){
        List<DBQ> data = dbqDao.poll(name, size);
        if(!data.isEmpty()){
            dbqDao.deleteByIdIn(data.stream().map(itm->itm.getId()).collect(Collectors.toList()));
        }else{
            COUNTER_EMPTY.incrementAndGet();
        }
        List<String> result = data.stream().map(itm->itm.getData()).collect(Collectors.toList());
        return result;
    }

    public Long size(String queueName){
        LOG.info("dbqueue size execs: {}   empty query times: {}", COUNTER_SIZE.incrementAndGet(), COUNTER_EMPTY.get());
        return dbqDao.countByQueueName(queueName);
    }

    public Long size(){
        LOG.info("dbqueue size execs: {}   empty query times: {}", COUNTER_SIZE.incrementAndGet(), COUNTER_EMPTY.get());
        return dbqDao.count();
    }

    public void check(){
        try {
            jdbcTemplate.execute("vacuum  "+ tableName + "; ");
            LOG.info("---queue vacuum table {} done.", tableName);
        }catch (Exception e){
            LOG.info("---queue vacuum table {} error: {}", tableName, e);
        }
    }
}
